package com.migrate.module.service;

import cn.hutool.core.collection.CollUtil;
import com.migrate.module.config.ApplicationContextUtil;
import com.migrate.module.domain.EtlBinlogConsumeRecord;
import com.migrate.module.enumeration.ConsumerStatus;
import com.migrate.module.mapper.migrate.EtlBinlogConsumeRecordMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.Collection;
import java.util.List;

/**
 * binlog消息拉取提交任务
 *
 * @author zhonghuashishan
 */
@Slf4j
public class CanalPullCommitRunner implements Runnable
{
    /**
     * 消息主题
     */
    private final String topic;
    /**
     * rocketmq的nameServer地址
     */
    private final String nameServerUrl;
    /**
     * binlog消息同步消费记录表Mapper
     */
    private final EtlBinlogConsumeRecordMapper consumeRecordMapper;
    /**
     * 消息拉取提交任务构造方法
     * @param topic 消息主题
     * @param nameServerUrl rocketmq的nameServer地址
     */
    public CanalPullCommitRunner(String topic, String nameServerUrl)
    {
        this.topic = topic;
        this.nameServerUrl = nameServerUrl;
        this.consumeRecordMapper = ApplicationContextUtil.getBean(EtlBinlogConsumeRecordMapper.class);
    }
    @Override
    public void run()
    {
        try
        {
            DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("binlogCommitConsumer");
            litePullConsumer.setAutoCommit(false); // 对他来说，他也是关闭了auto commit
            litePullConsumer.setNamesrvAddr(nameServerUrl);
            litePullConsumer.start();
            commitRun(litePullConsumer);
        }
        catch (MQClientException e)
        {
            log.error("消息提交失败", e);
        }
    }
    /**
     * 执行消息提交
     * @param consumer 消息拉取消费者
     */
    private void commitRun (DefaultLitePullConsumer consumer)
    {
        try
        {
            // 线程一旦启动，会先把topic里的queue拉取过来，他就知道你有多少queue
            // 把你的topic里所有的queue，都分配给你当前的这个consumer，当前的consumer他是可以拿到所有的queue
            Collection<MessageQueue> messageQueues = consumer.fetchMessageQueues(topic);
            consumer.assign(messageQueues);

            try {
                // 这里负责重试
                while (true) {
                    // 取得所有已消费未提交的记录
                    // 每一个binlog都会对应一条consume record，没有提交的record都查出来
                    List <EtlBinlogConsumeRecord> consumedRecords = consumeRecordMapper.getNotCommittedConsumedRecords(topic);
                    if (CollUtil.isNotEmpty(consumedRecords))
                    {
                        // 对每一个consumer record做遍历
                        for (EtlBinlogConsumeRecord consumedRecord : consumedRecords)
                        {
                            // 每个binlog都是rocketmq里的一条消息，topic、queue、offset、message -> binlog
                            // 基于consumer，seek，直接seek定位到topic、queue、offset那个位置去
                            consumer.seek(new MessageQueue(consumedRecord.getTopic(), consumedRecord.getBrokerName(), consumedRecord.getQueueId()), consumedRecord.getOffset());
                            //这一步必须，不然手动提交的东西不对，做一个poll操作，从我们指定的位置poll拉取过来一批数据
                            List<MessageExt> messageExts = consumer.poll();
                            // 提交已消费的消息，拉取过来一批消息过后，对这批消息，就可以去执行commit，把这批消息被处理成功的offset做一个提交
                            consumer.commitSync();
                            // 更新消费记录状态为已提交，再把这条消息消费记录的状态，修改为committed，就正式认为说这条消息他就已经提交成功了
                            consumedRecord.setConsumeStatus(ConsumerStatus.COMMITTED.getValue());
                            consumeRecordMapper.updateConsumeRecordStatus(consumedRecord);
                        }
                    }
                    else
                    {
                        Thread.sleep(5000);
                    }
                }
            }
            finally {
                consumer.shutdown();
            }
        }
        catch (MQClientException | InterruptedException e)
        {
            try
            {
                // 假设要拉取消息的主题还不存在，则会抛出异常，这种情况下休眠五秒再重试
                Thread.sleep(5000);
                commitRun (consumer);
            }
            catch (InterruptedException interruptedException)
            {
                log.error("消息拉取服务启动失败！", e);
            }
        }
    }
}
